{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Vowpal Wabbit and LightGBM for a Regression Problem\n",
    "\n",
    "This notebook shows how to build simple regression models by using \n",
    "[Vowpal Wabbit (VW)](https://github.com/VowpalWabbit/vowpal_wabbit) and \n",
    "[LightGBM](https://github.com/microsoft/LightGBM) with SynapseML.\n",
    " We also compare the results with \n",
    " [Spark MLlib Linear Regression](https://spark.apache.org/docs/latest/ml-classification-regression.html#linear-regression)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import math\n",
    "from synapse.ml.train import ComputeModelStatistics\n",
    "from synapse.ml.vw import VowpalWabbitRegressor, VowpalWabbitFeaturizer\n",
    "from synapse.ml.lightgbm import LightGBMRegressor\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "from pyspark.ml.regression import LinearRegression\n",
    "from sklearn.datasets import fetch_california_housing\n",
    "import requests"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prepare Dataset\n",
    "We use [*California Housing* dataset](https://scikit-learn.org/stable/datasets/real_world.html#california-housing-dataset). \n",
    "The data was derived from the 1990 U.S. census. It consists of 20640 entries with 8 features. \n",
    "We use `sklearn.datasets` module to download it easily, then split the set into training and testing by 75/25."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "with open(\"cal_housing_py3.pkz\", \"wb\") as f:\n",
    "    f.write(\n",
    "        requests.get(\n",
    "            \"https://mmlspark.blob.core.windows.net/datasets/cal_housing_py3.pkz\"\n",
    "        ).content\n",
    "    )\n",
    "\n",
    "california = fetch_california_housing(data_home=\".\", download_if_missing=False)\n",
    "\n",
    "feature_cols = [\"f\" + str(i) for i in range(california.data.shape[1])]\n",
    "header = [\"target\"] + feature_cols\n",
    "df = spark.createDataFrame(\n",
    "    pd.DataFrame(\n",
    "        data=np.column_stack((california.target, california.data)), columns=header\n",
    "    )\n",
    ").repartition(1)\n",
    "print(\"Dataframe has {} rows\".format(df.count()))\n",
    "display(df.limit(10).toPandas())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_data, test_data = df.randomSplit([0.75, 0.25], seed=42)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Following is the summary of the training set."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(train_data.summary().toPandas())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Plot feature distributions over different target values (house prices in our case)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "features = train_data.columns[1:]\n",
    "values = train_data.drop(\"target\").toPandas()\n",
    "ncols = 5\n",
    "nrows = math.ceil(len(features) / ncols)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Baseline - Spark MLlib Linear Regressor\n",
    "\n",
    "First, we set a baseline performance by using Linear Regressor in Spark MLlib."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n",
    "lr_train_data = featurizer.transform(train_data)[\"target\", \"features\"]\n",
    "lr_test_data = featurizer.transform(test_data)[\"target\", \"features\"]\n",
    "display(lr_train_data.limit(10))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# By default, `maxIter` is 100. Other params you may want to change include: `regParam`, `elasticNetParam`, etc.\n",
    "lr = LinearRegression(labelCol=\"target\")\n",
    "\n",
    "lr_model = lr.fit(lr_train_data)\n",
    "lr_predictions = lr_model.transform(lr_test_data)\n",
    "\n",
    "display(lr_predictions.limit(10))"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We evaluate the prediction result by using `synapse.ml.train.ComputeModelStatistics` which returns four metrics:\n",
    "* [MSE (Mean Squared Error)](https://en.wikipedia.org/wiki/Mean_squared_error)\n",
    "* [RMSE (Root Mean Squared Error)](https://en.wikipedia.org/wiki/Root-mean-square_deviation) = sqrt(MSE)\n",
    "* [R Squared](https://en.wikipedia.org/wiki/Coefficient_of_determination)\n",
    "* [MAE (Mean Absolute Error)](https://en.wikipedia.org/wiki/Mean_absolute_error)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "metrics = ComputeModelStatistics(\n",
    "    evaluationMetric=\"regression\", labelCol=\"target\", scoresCol=\"prediction\"\n",
    ").transform(lr_predictions)\n",
    "\n",
    "results = metrics.toPandas()\n",
    "results.insert(0, \"model\", [\"Spark MLlib - Linear Regression\"])\n",
    "display(results)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Vowpal Wabbit"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Perform VW-style feature hashing. Many types (numbers, string, bool, map of string to (number, string)) are supported."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "vw_featurizer = VowpalWabbitFeaturizer(inputCols=feature_cols, outputCol=\"features\")\n",
    "\n",
    "vw_train_data = vw_featurizer.transform(train_data)[\"target\", \"features\"]\n",
    "vw_test_data = vw_featurizer.transform(test_data)[\"target\", \"features\"]\n",
    "display(vw_train_data.limit(10))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "See [VW wiki](https://github.com/vowpalWabbit/vowpal_wabbit/wiki/Command-Line-Arguments) for command line arguments."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Use the same number of iterations as Spark MLlib's Linear Regression (=100)\n",
    "args = \"--holdout_off --loss_function quantile -l 0.004 -q :: --power_t 0.3\"\n",
    "vwr = VowpalWabbitRegressor(labelCol=\"target\", passThroughArgs=args, numPasses=100)\n",
    "\n",
    "# To reduce number of partitions (which will effect performance), use `vw_train_data.repartition(1)`\n",
    "vw_train_data_2 = vw_train_data.repartition(1).cache()\n",
    "print(vw_train_data_2.count())\n",
    "vw_model = vwr.fit(vw_train_data_2.repartition(1))\n",
    "vw_predictions = vw_model.transform(vw_test_data)\n",
    "\n",
    "display(vw_predictions.limit(10))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "metrics = ComputeModelStatistics(\n",
    "    evaluationMetric=\"regression\", labelCol=\"target\", scoresCol=\"prediction\"\n",
    ").transform(vw_predictions)\n",
    "\n",
    "vw_result = metrics.toPandas()\n",
    "vw_result.insert(0, \"model\", [\"Vowpal Wabbit\"])\n",
    "results = pd.concat([results, vw_result], ignore_index=True)\n",
    "\n",
    "display(results)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## LightGBM"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lgr = LightGBMRegressor(\n",
    "    objective=\"quantile\",\n",
    "    alpha=0.2,\n",
    "    learningRate=0.3,\n",
    "    numLeaves=31,\n",
    "    labelCol=\"target\",\n",
    "    numIterations=100,\n",
    ")\n",
    "\n",
    "repartitioned_data = lr_train_data.repartition(1).cache()\n",
    "print(repartitioned_data.count())\n",
    "lg_model = lgr.fit(repartitioned_data)\n",
    "lg_predictions = lg_model.transform(lr_test_data)\n",
    "\n",
    "display(lg_predictions.limit(10))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "metrics = ComputeModelStatistics(\n",
    "    evaluationMetric=\"regression\", labelCol=\"target\", scoresCol=\"prediction\"\n",
    ").transform(lg_predictions)\n",
    "\n",
    "lg_result = metrics.toPandas()\n",
    "lg_result.insert(0, \"model\", [\"LightGBM\"])\n",
    "\n",
    "results = pd.concat([results, lg_result], ignore_index=True)\n",
    "\n",
    "display(results)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Following figure shows the actual-vs.-prediction graphs of the results:\n",
    "\n",
    "<img width=\"1102\" alt=\"lr-vw-lg\" src=\"https://mmlspark.blob.core.windows.net/graphics/Documentation/regression_comparison.png\" />"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from matplotlib.colors import ListedColormap, Normalize\n",
    "from matplotlib.cm import get_cmap\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "f, axes = plt.subplots(nrows, ncols, sharey=True, figsize=(30, 10))\n",
    "f.tight_layout()\n",
    "yy = [r[\"target\"] for r in train_data.select(\"target\").collect()]\n",
    "for irow in range(nrows):\n",
    "    axes[irow][0].set_ylabel(\"target\")\n",
    "    for icol in range(ncols):\n",
    "        try:\n",
    "            feat = features[irow * ncols + icol]\n",
    "            xx = values[feat]\n",
    "            axes[irow][icol].scatter(xx, yy, s=10, alpha=0.25)\n",
    "            axes[irow][icol].set_xlabel(feat)\n",
    "            axes[irow][icol].get_yaxis().set_ticks([])\n",
    "        except IndexError:\n",
    "            f.delaxes(axes[irow][icol])\n",
    "\n",
    "cmap = get_cmap(\"YlOrRd\")\n",
    "\n",
    "target = np.array(test_data.select(\"target\").collect()).flatten()\n",
    "model_preds = [\n",
    "    (\"Spark MLlib Linear Regression\", lr_predictions),\n",
    "    (\"Vowpal Wabbit\", vw_predictions),\n",
    "    (\"LightGBM\", lg_predictions),\n",
    "]\n",
    "\n",
    "f, axes = plt.subplots(1, len(model_preds), sharey=True, figsize=(18, 6))\n",
    "f.tight_layout()\n",
    "\n",
    "for i, (model_name, preds) in enumerate(model_preds):\n",
    "    preds = np.array(preds.select(\"prediction\").collect()).flatten()\n",
    "    err = np.absolute(preds - target)\n",
    "\n",
    "    norm = Normalize()\n",
    "    clrs = cmap(np.asarray(norm(err)))[:, :-1]\n",
    "    axes[i].scatter(preds, target, s=60, c=clrs, edgecolors=\"#888888\", alpha=0.75)\n",
    "    axes[i].plot((0, 60), (0, 60), linestyle=\"--\", color=\"#888888\")\n",
    "    axes[i].set_xlabel(\"Predicted values\")\n",
    "    if i == 0:\n",
    "        axes[i].set_ylabel(\"Actual values\")\n",
    "    axes[i].set_title(model_name)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  },
  "name": "synapseml example - regression",
  "notebookId": 1395284431467721
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
